package com.carol.bigdata.task.label

import com.carol.bigdata.Config
import com.carol.bigdata.constant.KVConstant
import com.carol.bigdata.utils.{Flag, HBaseUtil, RddReader, TimeUtil}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object calRetentionLabel {
    val table: String = KVConstant.actionTable
    val loginPattern: String = KVConstant.loginPattern
    val columns: List[String] = KVConstant.keyColumns :+ "time"
    val retentionTable:String = "retention_label"
    val familyList: List[String] = List("key", "retention")

    def run(hbaseParams: Map[String, String],
            spark: SparkSession,
            statDay: String,
            game: String,
            retentionLabels: Array[Int]): Unit = {
        // 1、读取数据
        val loginRDD = RddReader.readHiveRDD(spark, table, statDay, loginPattern, game, columns)
          .map(x => (x.take(2), 1))
          .reduceByKey((_, _) => 1)
        println("loginRDD:", loginRDD.count())
        loginRDD.take(5).foreach(println)
        // 建表
        HBaseUtil.createTable(hbaseParams, retentionTable, familyList)
        // 2.计算留存标签 [1,3,5日留存]
        for (i <- retentionLabels) {
            val lastDay = TimeUtil.getTimeDeltaDay(statDay, -i)
            val lastLoginRDD = RddReader.readHiveRDD(spark, table, lastDay, loginPattern, game, columns)
              .map(x => (x.take(2), 1))
              .reduceByKey((_, _) => 1)
            println("lastLoginRDD:", lastLoginRDD.count())
            lastLoginRDD.take(5).foreach(println)
            // 计算留存
            val retRDD: RDD[(List[String], List[Int])] = lastLoginRDD.leftOuterJoin(loginRDD)
              .map(x => (x._1.take(2) :+ lastDay  , List(x._2._1 * x._2._2.getOrElse(0))))
            println("retRDD:", retRDD.count())
            retRDD.take(5).foreach(println)
            // 写入留存标签表
            val labels = List(s"active_r${i}")
            HBaseUtil.writeIntRDD2KvCF(hbaseParams, retentionTable, retRDD, familyList.head, familyList.last,  columns, labels)
        }
    }

    def main(args: Array[String]): Unit = {
        Flag.Parse(args)
        val spark = SparkSession.builder()
          .config(conf = Config.sparkConf)
          .enableHiveSupport()
          .getOrCreate()
        // hive地址相关配置需在SparkSession创建后再进行配置，否则可能因为各个配置文件会将地址覆盖掉导致进程停滞
        // spark.sparkContext.getConf
        //    .set("hive.metastore.uris", Config.HiveMetaStoreUris)
        //    .set("spark.sql.warehouse.dir", Config.sparkWarehouseDir)
        val retentionLabels = Array(1, 3)
        val sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        val hbaseParams: Map[String, String] = sc.broadcast(Config.hbaseParams).value
        val gameList = List("5")
        for (day <- 27 to 27) {
            val statDay = "2022-01-%02d".format(day)
            for (game <- gameList) {
                run(hbaseParams, spark, statDay, game, retentionLabels)
            }
        }
        spark.stop()
    }

}
